{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Time series forecasting with DeepAR - Synthetic Data (Managed Spot Training)\n",
    "\n",
    "1. [Introduction](#Introduction)\n",
    "2. [Setup](#Setup)\n",
    "3. [Generating and uploading data](#Generating-and-uploading-data)\n",
    "4. [Train a model](#Train-a-model)\n",
    "    1. [Configuring the Estimator for Managed Spot Training](#Configuring-the-Estimator-for-Managed-Spot-Training)\n",
    "    2. [Review Spot Savings](#Review-Spot-Savings)\n",
    "5. [Create endpoint and predictor](#Create-endpoint-and-predictor)\n",
    "6. [Make predictions and plot results](#Make-predictions-and-plot-results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Introduction\n",
    "\n",
    "DeepAR is a supervised learning algorithm for forecasting scalar time series. This notebook demonstrates how to prepare a dataset of time series for training DeepAR and how to use the trained model for inference. More details on the DeepAR algorithm can be found here: https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html.\n",
    "\n",
    "This notebook was tested in Amazon SageMaker Studio on ml.t3.medium instance with Python 3 (Data Science) kernel.\n",
    "\n",
    "This example is extended to demonstrate leveraging Managed Spot Training with Amazon SageMaker"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import json\n",
    "import matplotlib.pyplot as plt\n",
    "import boto3\n",
    "import sagemaker\n",
    "from sagemaker import get_execution_role\n",
    "\n",
    "np.random.seed(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's start by specifying:\n",
    "- The S3 bucket prefix that you want to use for training and model data. Here we use the default bucket with `sagemaker_session.default_bucket()`, but you can change this to a bucket of your choosing. This should be within the same region as the Notebook Instance, training, and hosting.\n",
    "- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Here we use the `get_execution_role` function to obtain the role arn which was specified when creating the notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "isConfigCell": true
   },
   "outputs": [],
   "source": [
    "prefix = \"sagemaker/SPOT-DEMO-deepar\"\n",
    "\n",
    "sagemaker_session = sagemaker.Session()\n",
    "role = get_execution_role()\n",
    "bucket = sagemaker_session.default_bucket()\n",
    "\n",
    "s3_data_path = f\"{bucket}/{prefix}/data\"\n",
    "s3_output_path = f\"{bucket}/{prefix}/output\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we configure the container image to be used for the region that we are running in."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker import image_uris\n",
    "\n",
    "training_image_uri = image_uris.retrieve(\n",
    "    region=boto3.Session().region_name, framework=\"forecasting-deepar\"\n",
    ")\n",
    "print(training_image_uri)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Generating and uploading data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this toy example we want to train a model that can predict the next 48 points of syntheticly generated time series.\n",
    "The time series that we use have hourly granularity."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "freq = 'H'\n",
    "prediction_length = 48"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We also need to configure the so-called `context_length`, which determines how much context of the time series the model should take into account when making the prediction, i.e. how many previous points to look at. A typical value to start with is around the same size as the `prediction_length`. In our example we will use a longer `context_length` of `72`. Note that in addition to the `context_length` the model also takes into account the values of the time series at typical seasonal windows e.g. for hourly data the model will look at the value of the series 24h ago, one week ago one month ago etc. So it is not necessary to make the `context_length` span an entire month if you expect monthly seasonalities in your hourly data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "context_length = 72"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For this notebook, we will generate 200 noisy time series, each consisting of 400 data points and with seasonality of 24 hours. In our dummy example, all time series start at the same time point `t0`. When preparing your data, it is important to use the correct start point for each time series, because the model uses the time-point as a frame of reference, which enables it to learn e.g. that weekdays behave differently from weekends."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "t0 = '2016-01-01 00:00:00'\n",
    "data_length = 400\n",
    "num_ts = 200\n",
    "period = 24"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Each time series will be a noisy sine wave with a random level. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "time_series = []\n",
    "for k in range(num_ts):\n",
    "    level = 10 * np.random.rand()\n",
    "    seas_amplitude = (0.1 + 0.3 * np.random.rand()) * level\n",
    "    sig = 0.05 * level  # noise parameter (constant in time)\n",
    "    time_ticks = np.array(range(data_length))\n",
    "    source = level + seas_amplitude * np.sin(time_ticks * (2 * np.pi) / period)\n",
    "    noise = sig * np.random.randn(data_length)\n",
    "    data = source + noise\n",
    "    index = pd.date_range(start=t0, freq=freq, periods=data_length)\n",
    "    time_series.append(pd.Series(data=data, index=index))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "time_series[0].plot()\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Often one is interested in tuning or evaluating the model by looking at error metrics on a hold-out set. For other machine learning tasks such as classification, one typically does this by randomly separating examples into train/test sets. For forecasting it is important to do this train/test split in time rather than by series.\n",
    "\n",
    "In this example, we will leave out the last section of each of the time series we just generated and use only the first part as training data. Here we will predict 48 data points, therefore we take out the trailing 48 points from each time series to define the training set. The test set contains the full range of each time series."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "time_series_training = []\n",
    "for ts in time_series:\n",
    "    time_series_training.append(ts[:-prediction_length])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "time_series[0].plot(label='test')\n",
    "time_series_training[0].plot(label='train', ls=':')\n",
    "plt.legend()\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following utility functions convert `pandas.Series` objects into the appropriate JSON strings that DeepAR can consume. We will use these to write the data to S3."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def series_to_obj(ts, cat=None):\n",
    "    obj = {\"start\": str(ts.index[0]), \"target\": list(ts)}\n",
    "    if cat is not None:\n",
    "        obj[\"cat\"] = cat\n",
    "    return obj\n",
    "\n",
    "def series_to_jsonline(ts, cat=None):\n",
    "    return json.dumps(series_to_obj(ts, cat))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "encoding = \"utf-8\"\n",
    "FILE_TRAIN = \"train.json\"\n",
    "FILE_TEST = \"test.json\"\n",
    "with open(FILE_TRAIN, \"wb\") as f:\n",
    "    for ts in time_series_training:\n",
    "        f.write(series_to_jsonline(ts).encode(encoding))\n",
    "        f.write(\"\\n\".encode(encoding))\n",
    "\n",
    "with open(FILE_TEST, \"wb\") as f:\n",
    "    for ts in time_series:\n",
    "        f.write(series_to_jsonline(ts).encode(encoding))\n",
    "        f.write(\"\\n\".encode(encoding))\n",
    "s3 = boto3.client(\"s3\")\n",
    "s3.upload_file(FILE_TRAIN, bucket, prefix + \"/data/train/\" + FILE_TRAIN)\n",
    "s3.upload_file(FILE_TEST, bucket, prefix + \"/data/test/\" + FILE_TRAIN)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train a model\n",
    "\n",
    "Now that we are done with all the setup that is needed, we are ready to train our object detector. To begin, let us create a ``sageMaker.estimator.Estimator`` object. This estimator will launch the training job."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Configuring the Estimator for Managed Spot Training\n",
    "\n",
    "We can now define the estimator that will launch the training job. Since we want to leverage Spot Instances for our training job, there are a few configuration parameters we need to pass when we create our Estimator.\n",
    "\n",
    "* use_spot_instances - Specifies whether to use SageMaker Managed Spot instances for training. If enabled then the train_max_wait arg should also be set.\n",
    "\n",
    "* max_wait - Timeout in seconds waiting for spot training instances (default: None). After this amount of time Amazon SageMaker will stop waiting for Spot instances to become available\n",
    "\n",
    "* max_run - Timeout in seconds for training (default: 24 * 60 * 60). After this amount of time Amazon SageMaker terminates the job regardless of its current status.\n",
    "\n",
    "More details on configuration parameters can be found here: https://sagemaker.readthedocs.io/en/stable/estimators.html"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator = sagemaker.estimator.Estimator(\n",
    "    sagemaker_session=sagemaker_session,\n",
    "    image_uri=training_image_uri,\n",
    "    role=role,\n",
    "    instance_count=1,\n",
    "    instance_type='ml.c4.xlarge',\n",
    "    use_spot_instances=True,\n",
    "    max_wait=3600,\n",
    "    max_run=3600,\n",
    "    base_job_name='SPOT-deepar',\n",
    "    output_path=f\"s3://{s3_output_path}\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next we need to set some hyperparameters: for example, frequency of the time series used, number of data points the model will look at in the past, number of predicted data points. The other hyperparameters concern the model to train (number of layers, number of cells per layer, likelihood function) and the training options such as number of epochs, batch size, and learning rate. Refer to the documentation for a full description of the available parameters."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "hyperparameters = {\n",
    "    \"time_freq\": freq,\n",
    "    \"context_length\": str(context_length),\n",
    "    \"prediction_length\": str(prediction_length),\n",
    "    \"num_cells\": \"40\",\n",
    "    \"num_layers\": \"3\",\n",
    "    \"likelihood\": \"gaussian\",\n",
    "    \"epochs\": \"20\",\n",
    "    \"mini_batch_size\": \"32\",\n",
    "    \"learning_rate\": \"0.001\",\n",
    "    \"dropout_rate\": \"0.05\",\n",
    "    \"early_stopping_patience\": \"10\"\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator.set_hyperparameters(**hyperparameters)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We are ready to launch the training job. SageMaker will start an EC2 instance, download the data from S3, start training the model and save the trained model.\n",
    "\n",
    "If you provide the `test` data channel, as we do in this example, DeepAR will also calculate accuracy metrics for the trained model on this test data set. This is done by predicting the last `prediction_length` points of each time series in the test set and comparing this to the actual value of the time series. The computed error metrics will be included as part of the log output.\n",
    "\n",
    "**Note:** the next cell may take a few minutes to complete, depending on data size, model complexity, and training options."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_channels = {\n",
    "    \"train\": f\"s3://{s3_data_path}/train/\", \n",
    "    \"test\": f\"s3://{s3_data_path}/test/\"\n",
    "}\n",
    "\n",
    "estimator.fit(inputs=data_channels)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Review Spot Savings\n",
    "\n",
    "By leveraging Managed Spot Training for SageMaker we can use EC2 Spot Instances for our training jobs, and take advantage of significant savings. Review the output from the training job displayed above you see your Managed Spot Training savings for this training job. You can adjust the instance type used for training as needed and re-run the training job to compare results against different types of instances."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create endpoint and predictor"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have trained a model, we can use it to perform predictions by deploying it to an endpoint.\n",
    "\n",
    "**Note:** remember to delete the endpoint after running this experiment. A cell at the very bottom of this notebook will do that: make sure you run it at the end."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "job_name = estimator.latest_training_job.name\n",
    "\n",
    "endpoint_name = sagemaker_session.endpoint_from_job(\n",
    "    job_name=job_name,\n",
    "    initial_instance_count=1,\n",
    "    instance_type=\"ml.m4.xlarge\",\n",
    "    image_uri=training_image_uri,\n",
    "    role=role,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To query the endpoint and perform predictions, we can define the following utility class: this allows making requests using `pandas.Series` objects rather than raw JSON strings."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class DeepARPredictor(sagemaker.predictor.Predictor):\n",
    "    def set_prediction_parameters(self, freq, prediction_length):\n",
    "        \"\"\"Set the time frequency and prediction length parameters. This method **must** be called\n",
    "        before being able to use `predict`.\n",
    "\n",
    "        Parameters:\n",
    "        freq -- string indicating the time frequency\n",
    "        prediction_length -- integer, number of predicted time points\n",
    "\n",
    "        Return value: none.\n",
    "        \"\"\"\n",
    "        self.freq = freq\n",
    "        self.prediction_length = prediction_length\n",
    "\n",
    "    def predict(\n",
    "        self,\n",
    "        ts,\n",
    "        cat=None,\n",
    "        encoding=\"utf-8\",\n",
    "        num_samples=100,\n",
    "        quantiles=[\"0.1\", \"0.5\", \"0.9\"],\n",
    "        content_type=\"application/json\",\n",
    "    ):\n",
    "        \"\"\"Requests the prediction of for the time series listed in `ts`, each with the (optional)\n",
    "        corresponding category listed in `cat`.\n",
    "\n",
    "        Parameters:\n",
    "        ts -- list of `pandas.Series` objects, the time series to predict\n",
    "        cat -- list of integers (default: None)\n",
    "        encoding -- string, encoding to use for the request (default: \"utf-8\")\n",
    "        num_samples -- integer, number of samples to compute at prediction time (default: 100)\n",
    "        quantiles -- list of strings specifying the quantiles to compute (default: [\"0.1\", \"0.5\", \"0.9\"])\n",
    "\n",
    "        Return value: list of `pandas.DataFrame` objects, each containing the predictions\n",
    "        \"\"\"\n",
    "        prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]\n",
    "        req = self.__encode_request(ts, cat, encoding, num_samples, quantiles)\n",
    "        res = super(DeepARPredictor, self).predict(req, initial_args={\"ContentType\": content_type})\n",
    "        return self.__decode_response(res, prediction_times, encoding)\n",
    "\n",
    "    def __encode_request(self, ts, cat, encoding, num_samples, quantiles):\n",
    "        instances = [series_to_obj(ts[k], cat[k] if cat else None) for k in range(len(ts))]\n",
    "        configuration = {\n",
    "            \"num_samples\": num_samples,\n",
    "            \"output_types\": [\"quantiles\"],\n",
    "            \"quantiles\": quantiles,\n",
    "        }\n",
    "        http_request_data = {\"instances\": instances, \"configuration\": configuration}\n",
    "        return json.dumps(http_request_data).encode(encoding)\n",
    "\n",
    "    def __decode_response(self, response, prediction_times, encoding):\n",
    "        response_data = json.loads(response.decode(encoding))\n",
    "        list_of_df = []\n",
    "        for k in range(len(prediction_times)):\n",
    "            prediction_index = pd.date_range(\n",
    "                start=prediction_times[k], freq=self.freq, periods=self.prediction_length\n",
    "            )\n",
    "            list_of_df.append(\n",
    "                pd.DataFrame(\n",
    "                    data=response_data[\"predictions\"][k][\"quantiles\"], index=prediction_index\n",
    "                )\n",
    "            )\n",
    "        return list_of_df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)\n",
    "predictor.set_prediction_parameters(freq, prediction_length)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Make predictions and plot results"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we can use the previously created `predictor` object. For simplicity, we will predict only the first few time series used for training, and compare the results with the actual data we kept in the test set."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "list_of_df = predictor.predict(time_series_training[:5])\n",
    "actual_data = time_series[:5]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for k in range(len(list_of_df)):\n",
    "    plt.figure(figsize=(12, 6))\n",
    "    actual_data[k][-prediction_length - context_length :].plot(label=\"target\")\n",
    "    p10 = list_of_df[k][\"0.1\"]\n",
    "    p90 = list_of_df[k][\"0.9\"]\n",
    "    plt.fill_between(p10.index, p10, p90, color=\"y\", alpha=0.5, label=\"80% confidence interval\")\n",
    "    list_of_df[k][\"0.5\"].plot(label=\"prediction median\")\n",
    "    plt.legend()\n",
    "    plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Delete Endpoint"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sagemaker_session.delete_endpoint(endpoint_name)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.13"
  },
  "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.  Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
